Skip to content
0

Reactor - 最佳实践

如何包装一个同步阻塞的调用

很多时候,信息源是同步和阻塞的。在 Reactor 中,我们用以下方式处理这种信息源:

java
Mono blockingWrapper = Mono.fromCallable(() -> { // 1
    return /* make a remote synchronous call */ // 2
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); // 3
  1. 使用 fromCallable 方法生成一个 Mono
  2. 返回同步、阻塞的资源
  3. 使用 Schedulers.elastic() 确保对每一个订阅来说运行在一个专门的线程上

因为调用返回一个值,所以你应该使用 Mono。你应该使用 Schedulers.elastic 因为它会创建一个专门的线程来等待阻塞的调用返回。

注意 subscribeOn 方法并不会「订阅」这个 Mono。它只是指定了订阅操作使用哪个 Scheduler

用在 Flux 上的操作符好像没起作用,为啥

请确认你确实对调用 .subscribe() 的发布者应用了这个操作符。

Reactor 的操作符是装饰器(decorators)。它们会返回一个不同的(发布者)实例, 这个实例对上游序列进行了包装并增加了一些的处理行为。所以,最推荐的方式是将操作符「串」起来。

对比下边的两个例子:

没有串起来(不正确的)

java
Flux<String> flux = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); // 1
flux.subscribe(next -> System.out.println("Received: " + next));
  1. 问题在这, flux 变量并没有改变

串起来(正确的)

java
Flux<String> flux = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

下边的例子更好(因为更简洁):

串起来(最好的)

java
Flux<String> secrets = Flux
  .just("foo", "chain")
  .map(secret -> secret.replaceAll(".", "*"))
  .subscribe(next -> System.out.println("Received: " + next));

第一个例子的输出:

java
Received: foo
Received: chain

后两个例子的输出:

java
Received: ***
Received: *****

Mono zipWith/zipWhen 没有被调用

例子

java
myMethod.process("a") // 这个方法返回 Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //没有被调用
        .subscribe();

如果源 Mono 为空或是一个 Mono<Void>Mono<Void> 通常用于「空」的场景), 下边的组合操作就不会被调用。

对于类似 zipWith 的用于转换的操作符来说,这是比较典型的场景。 这些操作符依赖于数据元素来转换为输出的元素。 如果任何一个序列是空的,则返回的就是一个空序列,所以请谨慎使用。 例如在 then() 之后使用 zipWith() 就会导致这一问题。

对于以 Function 作为参数的 and 更是如此,因为返回的 Mono 是依赖于收到的数据懒加载的(而对于空序列或 Void 的序列来说是没有数据发出来的)。

你可以使用 .defaultIfEmpty(T) 将空序列替换为包含 T 类型缺省值的序列(而不是 Void 序列), 从而可以避免类似的情况出现。举例如下:

zipWhen 前使用 defaultIfEmpty

java
myMethod.emptySequenceForKey("a") // 这个方法返回一个空的 Mono<String>
        .defaultIfEmpty("") // 将空序列转换为包含字符串 "" 的序列
        .zipWhen(aString -> myMethod.process("b")) // 当 "" 发出时被调用
        .subscribe();

如何用 retryWhen 来实现 retry(3) 的效果

retryWhen 方法比较复杂,希望下边的一段模拟 retry(3) 的代码能够帮你更好地理解它的工作方式:

java
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), // 1
          (error, index) -> { // 2
            if (index < 4) return index; // 3
            else throw Exceptions.propagate(error); // 4
          })
    );
  1. 技巧一:使用 zip 和一个「重试个数 + 1」的 range
  2. zip 方法让你可以在对重试次数计数的同时,仍掌握着原始的错误(error)。
  3. 允许三次重试,小于 4 的时候发出一个值。
  4. 为了使序列以错误结束。我们将原始异常在三次重试之后抛出。

如何使用 retryWhen 进行 exponential backoff

Exponential backoff 的意思是进行的多次重试之间的间隔越来越长, 从而避免对源系统造成过载,甚至宕机。基本原理是,如果源产生了一个错误, 那么已经是处于不稳定状态,可能不会立刻复原。所以,如果立刻就重试可能会产生另一个错误, 导致源更加不稳定。

下面是一段实现 exponential backoff 效果的例子,每次重试的间隔都会递增 (伪代码: delay = attempt number * 100 milliseconds):

java
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
        .doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) // 1
        .zipWith(Flux.range(1, 4), (error, index) -> { // 2
          if (index < 4) return index;
          else throw Exceptions.propagate(error);
        })
        .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) // 3
        .doOnNext(s -> System.out.println("retried at " + LocalTime.now())) // 4
    );
  1. 记录错误出现的时间
  2. 使用 retryWhen + zipWith 的技巧实现重试 3 次的效果
  3. 通过 flatMap 来实现延迟时间递增的效果
  4. 同样记录重试的时间

订阅它,输出如下:

java
java.lang.IllegalArgumentException at 18:02:29.338
retried at 18:02:29.459 // 1
java.lang.IllegalArgumentException at 18:02:29.460
retried at 18:02:29.663 // 2
java.lang.IllegalArgumentException at 18:02:29.663
retried at 18:02:29.964 // 3
java.lang.IllegalArgumentException at 18:02:29.964
  1. 第一次重试延迟大约 100ms
  2. 第二次重试延迟大约 200ms
  3. 第三次重试延迟大约 300ms

如何使用 publishOn() 确保线程关联性

调度器(Schedulers) 所述,publishOn() 可以用来切换执行线程。 publishOn 能够影响到其之后的操作符的执行线程,直到有新的 publishOn 出现。 所以 publishOn 的位置很重要。

比如下边的例子, map() 中的 transform 方法是在 scheduler1 的一个工作线程上执行的, 而 doOnNext() 中的 processNext 方法是在 scheduler2 的一个工作线程上执行的。 单线程的调度器可能用于对不同阶段的任务或不同的订阅者确保线程关联性。

java
EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
         .map(i -> transform(i))
         .publishOn(scheduler2)
         .doOnNext(i -> processNext(i))
         .subscribe();
最近更新